fix(esc): worker-thread iteration for OpenAI-compat streaming (LiteLLM hang fix)#148
Merged
Merged
Conversation
PRs #145 / #146 added mid-stream ESC cancellation for OpenAI-compatible providers via a ``stream.response.close()`` listener. That works for direct connections, but a user on LiteLLM-proxied Claude Opus 4.7 reported it still hangs indefinitely — ``response.close()`` from the listener thread doesn't actually terminate the SDK's blocking socket read in that configuration, so the iterator keeps draining bytes and the agent loop's cancel boundary never sees the abort. Two earlier attempts confirmed the diagnosis: * Adding ``signal.pthread_kill(main_thread, SIGINT)`` to the listener DID unblock the iteration but propagated ``KeyboardInterrupt`` uncaught past every layer of exception handling, exiting the entire ``python -m src.cli`` process. The signal hits the main thread at the next bytecode boundary, which is rarely inside our narrow ``except KeyboardInterrupt`` window. * The TypeScript reference at ``typescript/src/services/api/openaiShim.ts`` works because JavaScript's native ``fetch + AbortSignal`` integration makes ``reader.read()`` reject with ``AbortError`` when the signal trips. Python's sync ``httpx`` has no equivalent. Fix: decouple the main thread's response time from the SDK's cooperation. The SDK iteration runs on a daemon worker thread that pushes chunks into a ``queue.Queue``. The main thread polls with a 100 ms timeout and re-checks ``guard.aborted`` between ticks. On abort, raise ``AbortError`` immediately and orphan the worker — it dies when the underlying connection eventually closes. The cost is some wasted bandwidth on the orphaned read; the benefit is that ESC unwinds within ~100 ms regardless of LiteLLM / httpx / chunked- transfer behavior. Specifics: * Worker pushes each chunk into the queue, then a ``_DONE`` sentinel in a ``finally`` so the main thread can break the loop on either normal exhaustion or worker exception. * Worker catches ``BaseException`` so ``KeyboardInterrupt`` / ``SystemExit`` from the worker thread are routed back to the main thread for re-raise rather than silently dropped. * Main-thread loop checks abort in the ``Queue.Empty`` branch (bounds ESC latency at one tick) AND after processing each chunk (preserves the chunk we just received, matching the in-loop-check semantics pinned by the existing chunk-list regression test). * Abort sites call ``guard.raise_if_post_aborted()`` instead of hardcoding ``AbortError("user_interrupt")`` so a non-default abort reason on the signal (future ``"rate_limit_backoff"`` etc.) is preserved. * Updated method docstring explains the worker indirection and the Python-vs-JS-fetch contrast so the next engineer touching this doesn't "simplify" the queue away. Two new regression tests: * ``test_abort_unwinds_promptly_even_when_iterator_never_returns`` uses a ``_StuckStream`` whose ``__iter__`` blocks on a never-set Event. Without the worker+queue, the main thread would block on ``next(stream)`` forever; mutation-verified by reverting to the pre-fix code and watching pytest hang past an 8-second gtimeout. * ``test_normal_completion_still_captures_final_usage`` pins the drain invariant: the main thread must process the final empty-choices/populated-usage chunk before breaking on ``_DONE``, otherwise OpenAI's ``stream_options.include_usage=True`` token counts would silently regress. Anthropic and Minimax providers are unchanged. They use the ``anthropic`` SDK whose ``client.messages.stream(...)`` works reliably against direct connections, and no user has reported the LiteLLM-style proxy buffering against them. If that changes, the same worker+queue pattern ports cleanly. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
3 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
response.close()from another thread is purely advisory in this configuration.queue.Queue. Main thread polls with 100 ms timeout, checks abort each tick, raisesAbortErrorimmediately and orphans the worker.Why this is needed
The TypeScript reference at
typescript/src/services/api/openaiShim.tsworks because JavaScript's nativefetch + AbortSignalintegration —reader.read()rejects withAbortErrorwhen the signal trips. Python's synchttpxhas no equivalent. Two earlier attempts confirmed the diagnosis:signal.pthread_kill(main_thread, SIGINT)DID unblock the iteration but propagatedKeyboardInterruptuncaught past every layer of exception handling, exiting the entirepython -m src.cliprocess. SIGINT lands at the next bytecode boundary, rarely inside our narrowexcept KeyboardInterruptwindow.response.close()from another thread is best-effort across SDK + proxy + httpx version combinations. For LiteLLM-proxied chunked-transfer streams, the kernel-level socket read keeps receiving data even after the client-side response object is "closed."The worker+queue pattern is the only portable design that makes the main thread's response time independent of the SDK's cooperation.
Changes
src/providers/openai_compatible.py:queue.Queuewith 100ms timeout, checksguard.abortedbetween ticksBaseExceptionsoKeyboardInterrupt/SystemExitare routed back to main for re-raiseguard.raise_if_post_aborted()instead of hardcodedAbortError("user_interrupt")to preserve non-default abort reasonstests/test_openai_compat_abort_signal.py— 2 new regression tests:test_abort_unwinds_promptly_even_when_iterator_never_returns— uses a_StuckStreamwhose__iter__blocks on a never-setthreading.Event. Mutation-verified: reverting to pre-fix code made pytest hang past an 8-secondgtimeout— confirming this test catches the LiteLLM regression.test_normal_completion_still_captures_final_usage— pins the drain invariant. OpenAI emits usage stats in the final empty-choices chunk; the main thread must process every queued chunk before breaking on_DONE, otherwise↓ N tokensREPL spinner would silently lose count.Test plan
Trade-offs
queue.Queueis unbounded, so a worst-case "stuck iterator AND chunks flowing" path could accumulate megabytes. Flagged as a follow-up; not blocking on current evidence (the user's repro has the iterator stuck, not flowing).Out of scope (explicit user request)
Not extending the worker+queue to AnthropicProvider or MinimaxProvider. Both use the
anthropicSDK and work reliably against direct connections; no user has reported proxy-buffering issues against them. The same pattern ports cleanly if needed in the future.🤖 Generated with Claude Code